Broadcast Join

Broadcast join is very efficient for joins between a large dataset with a small dataset. Spark splits up data on different nodes in a cluster so multiple computers can process data in parallel.Broadcast joins are easier to run on a cluster. Spark can “broadcast” a small DataFrame by sending all the data in that small DataFrame to all nodes in the cluster. After the small DataFrame is broadcasted, Spark can perform a join without shuffling any of the data in the large DataFrame.

Let say Table B is a smaller table, means that it will be broadcast to each executor node. Table A of each partition will be through the block manager to get Table A data. According to each record of the Join Key to take the corresponding record in Table B, according to the Join Type to operate.

 enter image description here
  • Table needs to be broadcast less than  spark.sql.autoBroadcastJoinThreshold the configured value, default 10MB.
  • Base table can not be broadcast, such as the left outer join, only broadcast the right table
case class Employee(name:String, age:Int, depId: String)
case class Department(id: String, name: String)

val empRDD = sc.parallelize(
                         Seq(Employee("Mary", 33, "IT"),
                                Employee("Paul", 45,"IT"),
                                Employee("Peter",26,"MKT"),
                                Employee("Jon",34, "MKT"),
                                Employee("Sarah", 29, "IT"),
                                Employee("Steve", 21, "Intern") ))

val deptRDD = sc.parallelize(
                         Seq(Department("IT", "IT  Department"),    
                                Department("MKT", "Marketing Department"),
                                Department("FIN", "Finance & Controlling")))

val empDF = empRDD.toDF
val deptDF = deptRDD.toDF

Result without Broadcast Variable
import org.apache.spark.sql.functions.broadcast
val res = empDF.join(deptDF,$"depId"===$"id","inner").explain()




Result  Broadcast Variable
import org.apache.spark.sql.functions.broadcast
val res = empDF.join(broadcast(deptDF),$"depId"===$"id","inner").explain()


Configuring Spark Auto Broadcast join.
We can provide the max size of DataFrame as a threshold for automatic broadcast join detection in Spark. This can be set up by using autoBroadcastJoinThreshold configuration in Spark SQL conf. Its value purely depends on the executor’s memory.

//Enable broadcast Join and 
//Set Threshold limit of size in bytes of a dataFrame to broadcast
spark.conf.set("spark.sql.autoBroadcastJoinThreshold", 104857600)

//Disable broadcast Join.
spark.conf.set("spark.sql.autoBroadcastJoinThreshold", -1)

No comments:

Post a Comment